Course "Data processing and Visualization", IE500417, NTNU.
Note (as usual): plagiarism is strictly forbidden! You should never copy any source code from other students. If you use any code written by others (except the standards libraries: NumPy, SciPy, Pandas, etc), provide a reference.
If the teachers see that your work is mostly copy+paste from online code snippets, the grade can be reduced.
If a case of plagiarism is detected, it will be reported to the administration.
In this assignment, you will practice digital signal processing (in a rather basic form, there will be no advanced DSP methods). You will work with two signals simultaneously. As it sometimes happens, the two signals are not synchronized: they are sampled at different time moments and with different sampling rate. You will have to resample and synchronize them so that both signals have the same sample timestamps. You will do some analysis of the signals and visualize them using line charts.
The assignment must be handed in on Blackboard. The following must be handed in:
Deadlines and grading information on Blackboard.
Step 1.1: Load the two signals from CSV files: s1.csv and s2.csv.
# Your code here
# Import libraries
import pandas as pd
import plotly.graph_objects as go
import numpy as np
# Signal 1
df1 = pd.read_csv("s1.csv")
# Signal 2
df2 = pd.read_csv("s2.csv")
Step 1.2: Do a quick analysis of what data you got: column names of each signal and number of rows.
# Your code here
print("Signal 1 Information:")
df1.info()
print()
print("Signal 2 Information:")
df2.info()
Step 1.3: One of the signals is sampled at even frequency, another is not. Find out which is the nonuniformly
sampled signal. Store it in variable signal_x. Store the the uniformly sampled signal in variable signal_u.
Note: "find out" here means "write code that finds out". If you will manually assign the signal_u and signal_x variables, you won't get points for this step. The reason - manual assignment is not flexible. If the dataset changes, your remaining notebook calculations will be wrong suddenly. Flexible code that finds the necessary signals would work even if we would swap the s1.csv and s2.csv files.
# Your code here
signal_x = ...
signal_u = ...
# Convert string to datetime
df1["timestamp"] = pd.to_datetime(df1["time"])
df2["timestamp"] = pd.to_datetime(df2["time"])
# Difference between successive timestamps
time_diff = df1.timestamp.diff().dropna()
# Check if a series has identical values
# Reference: https://stackoverflow.com/questions/54405704/check-if-all-values-in-dataframe-column-are-the-same
def is_all_identical(series):
a = series.to_numpy()
return (a[0] == a).all()
if is_all_identical(time_diff):
signal_x = df2.set_index("timestamp").s2
signal_u = df1.set_index("timestamp").s1
else:
signal_x = df1.set_index("timestamp").s1
signal_u = df2.set_index("timestamp").s2
Step 1.4. Plot the two signals in a line chart:
A reference showing approximately how it could look:

# Your code here
# Plot
fig = go.Figure(data=[
go.Scatter(name="Signal U", x=signal_u.index, y=signal_u, line = dict(color='green', width=2, dash='dash')),
go.Scatter(name="Signal X", x=signal_x.index, y=signal_x, line = dict(color='blue', width=1))
])
fig.update_layout(
title={
'text': "Signal values",
'y':0.9,
'x':0.5,
'xanchor': 'center',
'yanchor': 'top',
'font': dict(size=20)
},
xaxis_title="Sample time",
yaxis_title="Value"
)
fig.show()
Step 1.5: Find out the sampling frequency of Signal U, save it in variable f_u.
# Your code here
# For a uniformly sampled signal
sample_time = (signal_u.index[1] - signal_u.index[0]).total_seconds()
f_u = 1/sample_time
Step 1.6: Find out which are the highest frequencies used in Signal U. Save the highest frequency in variable b_u, having Hz as units.
Hint: use Fourier transform, and find max frequency having a significant amplitude. There may be many frequencies with tiny amplitudes. Use some threshold to filter these out.
# Your code here
# Number of samples
N = len(signal_u)
fft_result = np.fft.rfft(signal_u)/N
# threshold_amplitude: 0.0001
max_f = 0
for i in range(len(fft_result)):
if(np.abs(fft_result[i]) > 0.0001):
max_f = i
t_start = signal_u.index.min()
t_end = signal_u.index.max()
sample_time = (t_end - t_start).total_seconds()
b_u = max_f/sample_time
Step 1.7: Find out the minimum frequency at which Signal U should have been sampled to still contain all the information in the signal. Save it in variable fs_u.
Hint: Nyquist-Shannon theorem
# Your code here
# From Nyquist-Shannon theorem
fs_u = 2 * b_u
Step 1.8: Calculate, how many % of space is wasted by storing too many samples for Signal U. I.e., if we would resample in the Signal U at a sampling rate fs_u, how many samples would we store, and how much that is in relation to the number of samples in the CSV file?
If it is 0, why?
P.S. Don't worry about Signal X – the sampling system for it was designed by careless engineers who did not know about Nyquist-Shannon's sampling theorem. Therefore, the sampling of Signal X is not proper. But we work with what we have.
# Your code here
print("% of space wasted:", abs(fs_u - f_u)*100)
Note: whenever you modify something for the signals, it is suggested to store the modifierd signal in another variable. Keep the original one intact. You may later want to compare the two.
Step 2.1: Decimate (down-sample) the Signal U to 10Hz, store it in variable su_resampled:
# Your code here
su_resampled = signal_u.resample("100ms").asfreq()
Step 2.2: Explain - why is the resampled signal not containing the same information as the original signal (i.e., what information is lost?)
--- YOUR ANSWER HERE ---
The amplitude values recorded at the intermediate timestamps are lost.
From now on, in all the places whare you need to do something with Signal U, use the resampled version: su_resampled.
Step 2.3: Synchronize Signal X with Signal U, store it in variable sx_resampled. I.e., if su_resampled is sampled at time moments t0, t1, …, tN, then resample Signal X at the same time moments: t0, t1, …, tN. This may involve several steps,
depending on what functions/libraries you use.
Hint: see 07-2-Resampling.ipynb example notebook on Blackboard ().
Hint: your resulting sx_resampled should be 10Hz signal, not 100Hz.
# Your code here
# Resampled at 100 Hz sampling rate
sx_up_sampled = signal_x.resample("10ms").asfreq()
# Multiplied by 1000 to get bigger values for interpolation
sx_up_sampled_big_val = sx_up_sampled * 1000
# Interpolate to fill empty values and divide by 1000
sx_up_sampled_int = sx_up_sampled_big_val.interpolate(method="spline", order=2) / 1000
# Resampled at 10 Hz
sx_resampled = sx_up_sampled_int.resample("100ms").asfreq()
Step 2.4: Check if the two signals really are synchronized – compare the timestamps, these should be equal.
# Your code here
sur_indices = su_resampled.index
sxr_indices = sx_resampled.index
# Check if the difference between timestamps is 0
assert (sur_indices[0] - sxr_indices[0]).total_seconds() == 0
# If the difference is 0, check if it is the same for all timestamp indices
assert is_all_identical(sur_indices - sxr_indices)
Step 2.5: Take both signals and insert them into a single DataFrame object (name it composed_data) which has:
signalX and signalU containing the corresponding values (sx_resampled and su_resampled).# Your code here
# Create dataframe with timestamp index
composed_data = pd.DataFrame(index = su_resampled.index)
composed_data["signalX"] = sx_resampled
composed_data["signalU"] = su_resampled
composed_data
In this part you will find extreme values in the signals. Typically, these could mean outliers, sampling errors or extreme modes of operation in the system (such as overheating of a motor).
Step 3.1: Find Signal U values (su_resampled) above 170.0. Store those in variable extreme_u_vals.
# Your code here
extreme_u_vals = su_resampled[su_resampled > 170.0]
Step 3.2: Find Signal X values (sx_resampled) outside the range mean ± 2* StdDev. Store those in variable ex_static_x_vals.
# Your code here
sd = sx_resampled.std()
mean = sx_resampled.mean()
range_start = mean - 2 * sd
range_end = mean + 2 * sd
ex_static_x_vals = sx_resampled[(sx_resampled < range_start) | (sx_resampled > range_end)]
Step 3.3: Find Signal X values (sx_resampled) outside an adaptive range: 3-second moving average ± 2 * StdDev. Both the average and StdDev are calculated in a 3-second rolling window. Store the values in variable ex_dynamic_x_vals.
# Your code here
# Rolling window size 31 for 3 second moving average
sx_rolling = sx_resampled.rolling(31)
rolling_mean = sx_rolling.mean()
rolling_sd = sx_rolling.std()
rolling_range_start = rolling_mean - 2 * rolling_sd
rolling_range_end = rolling_mean + 2 * rolling_sd
ex_dynamic_x_vals = sx_resampled[(sx_resampled < rolling_range_start) | (sx_resampled > rolling_range_end)]
Step 4.1: Plot a line chart with values, rolling mean, normal boundaries (mean +/- 2StdDev) and the extreme values ex_dynamic_x_vals that you calculated in Part 3. Example:

# Your code here
# Plot
fig = go.Figure(data=[
go.Scatter(name="Signal X (10 Hz)", x=sx_resampled.index, y=sx_resampled, line = dict(color='blue', width=1)),
go.Scatter(name="Rolling mean", x=rolling_mean.index, y=rolling_mean, line = dict(color='orange', width=1)),
go.Scatter(name="Rolling M+2S", x=rolling_range_end.index, y=rolling_range_end, line = dict(color='darkgreen', width=2,
dash='dot')),
go.Scatter(name="Rolling M-2S", x=rolling_range_start.index, y=rolling_range_start, line = dict(color='green', width=2,
dash='dot')),
go.Scatter(name="Extreme Values", x=ex_dynamic_x_vals.index, y=ex_dynamic_x_vals, marker=dict(color="red", size=5),
mode="markers")
])
fig.update_layout(
title={
'text': "Signal X extreme values",
'y':0.9,
'x':0.5,
'xanchor': 'center',
'yanchor': 'top',
'font': dict(size=20)
},
xaxis_title="Sample time",
yaxis_title="Value"
)
fig.show()
Step 4.2: Find segments in Signal X (sx_resampled) where the 10-second moving average value is increasing for a continuous period of at least two seconds.
# Your code here
# Rolling window size 101 for 10 second moving average
rolling_mean_10s = sx_resampled.rolling(101).mean()
# Filter for increasing moving average values
filter_increasing_values = rolling_mean_10s.diff() > 0
# Reference: https://stackoverflow.com/questions/37934399/identifying-consecutive-occurrences-of-a-value
# For a period of two seconds
threshold = 20
condition = filter_increasing_values.groupby((filter_increasing_values != filter_increasing_values.shift())
.cumsum()).transform('size') * filter_increasing_values >= threshold
result = rolling_mean_10s[condition]
segments = pd.Series(result, index=sx_resampled.index)
Step 4.3: Plot a line chart and mark these regions (from previous step) in a different color.
For example: show the normal line in blue color and the continuously increasing moving average segments in green. Example:

# Your code here
# Plot
fig = go.Figure(data=[
go.Scatter(name="Signal X (10 Hz)", x=sx_resampled.index, y=sx_resampled, line = dict(color='blue', width=1)),
go.Scatter(name="Increasing MA", x=segments.index, y=segments, marker = dict(color='green'))
])
fig.update_layout(
title={
'text': "MA(X) increasing for 2+ seconds",
'y':0.9,
'x':0.5,
'xanchor': 'center',
'yanchor': 'top',
'font': dict(size=20)
},
xaxis_title="Sample time",
yaxis_title="Value"
)
fig.show()
Please reflect on the following questions:
--- YOUR ANSWERS HERE ---
1. It was the hardest of the assignments.
2. More than 20 hours.
3. Part 4.
4. Make less assumptions. Some of the problems will not have resources that could be referenced.
5. This felt like a different kind of exercise. Previous exercise solutions did not help much.
6. FFT and increasing moving average segments exercises. Needs more information.